package cn.piflow

import java.util.{Date, Properties}

import cn.piflow.local.SparkFlowEngine
import cn.piflow.rpc.RemoteFlowEngine
import cn.piflow.util.Logging
import org.apache.spark.sql.{SQLContext, SparkSession}

import scala.reflect.ClassTag

trait FlowEvent {

}

trait RunnerContext {
	def arg[T](name: String): T;

	def sparkSession: SparkSession;

	def sqlContext: SQLContext;

	def argForType[T: ClassTag](implicit m: Manifest[T]): T =
		arg(m.runtimeClass.getName);

	def arg[T](name: String, value: T): this.type;
}

trait JobContext extends RunnerContext {
	def flowGraph: FlowGraph;

	def jobInstanceId: String;

	def notifyEvent(event: FlowEvent);
}

trait ProcessorContext extends JobContext {
	def jobContext: JobContext;

	def flowNodeId: Int;
}

trait StatService {
	def getStat(jobId: String): FlowJobStat;
}

trait JobService {
	def schedule(flowGraph: FlowGraph, schedule: JobSchedule): ScheduledJob;

	def run(flowGraph: FlowGraph, timeout: Long): ScheduledJob;

	def exists(jobId: String): Boolean;

	def getFireCount(jobId: String): Int;

	def getHistoricExecutions(): Seq[JobInstance];

	def getHistoricExecutions(jobId: String): Seq[JobInstance];

	def getScheduledJobs(): Seq[ScheduledJob];

	def getRunningJobs(): Seq[JobInstance];

	def getRunningJobs(jobId: String): Seq[JobInstance];

	def resume(jobId: String);

	def pause(jobId: String);

	def stop(jobId: String);
}

trait JobInstance {
	def getId(): String;

	def getScheduledJob(): ScheduledJob;

	def getStartTime(): Date;

	def getRunTime(): Long;
}

trait ScheduledJob {
	def getId(): String;

	def getNextFireTime(): Date;

	def getStartTime(): Date;

	def getEndTime(): Date;

	def getPreviousFireTime(): Date;
}

trait FlowEngine {
	def schedule(flowGraph: FlowGraph, start: Start.Builder = Start.now, run: Repeat.Builder = Repeat.once): ScheduledJob = {
		val js = new JobSchedule();
		start(js);
		run(js);
		getJobService().schedule(flowGraph, js);
	}

	def run(flowGraph: FlowGraph, timeout: Long = 0): ScheduledJob = getJobService().run(flowGraph, timeout);

	def getJobService(): JobService;

	def getStatService(): StatService;
}

object FlowEngine extends Logging {
	def startByConf(): (SparkSession, FlowEngine) = {
		val props = new Properties();
		val url = this.getClass.getResource("/piflow-engine.properties");
		if (url != null) {
			logger.info(s"loading configurations from $url");
			val is = url.openStream();
			props.load(is);
			is.close();
		}
		else {
			logger.warn("failed to load piflow-engine.properties");
		}

		//TODO: by properties, cluster mode...
		val spark = SparkSession.builder.master("local[4]")
			.getOrCreate();

		val engine =
			props.getProperty("engine.location") match {
				case "remote" => {
					val httpUrl = props.getProperty("server.http.url");
					FlowEngine.connect(httpUrl);
				}
				case _ => {
					FlowEngine.startLocal(spark);
				}
			}

		(spark, engine)
	}

	def startLocal(sparkSession: SparkSession): SparkFlowEngine = {
		val en = new SparkFlowEngine(sparkSession);
		en.start();
		en;
	}

	def connect(httpServletUrl: String) = new RemoteFlowEngine(httpServletUrl);
}